Source code for hysop.operator.gradient

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
@file gradient.py
Gradient: compute dFi/dXj for a given field, up to all components in all directions.
MinMaxGradientStatistics: compute min(dFi/dXj), max(dFi/dXj) and/or max(|dFi/dXj|)
                          for a given field, up to all components in all directions.
"""
from hysop import vprint
from hysop.constants import Implementation, DirectionLabels, TranspositionState
from hysop.fields.continuous_field import Field, ScalarField, TensorField
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.decorators import debug
from hysop.tools.numpywrappers import npw
from hysop.core.graph.graph import op_apply
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.core.graph.computational_node_frontend import ComputationalGraphNodeFrontend
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.operator.directional.directional import DirectionalOperatorGeneratorI
from hysop.operator.derivative import (
    SpaceDerivative,
    MultiSpaceDerivatives,
    FiniteDifferencesSpaceDerivative,
    SpectralSpaceDerivative,
)
from hysop.operator.min_max import (
    MinMaxDerivativeStatistics,
    MinMaxFiniteDifferencesDerivativeStatistics,
    MinMaxSpectralDerivativeStatistics,
)
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.parameters.tensor_parameter import TensorParameter


[docs] class Gradient(MultiSpaceDerivatives): """ Generate multiple SpaceDerivative operators to compute the gradient of a Field. """
[docs] @classmethod def implementations(cls): return SpaceDerivative.implementations()
@debug def __new__( mcls, F, gradF, directions=None, implementation=None, cls=FiniteDifferencesSpaceDerivative, base_kwds=None, **kwds, ): base_kwds = {} if (base_kwds is None) else base_kwds base_kwds.update( dict(candidate_input_tensors=None, candidate_output_tensors=None) ) return super().__new__( mcls, Fs=None, dFs=None, cls=cls, candidate_input_tensors=(F,), candidate_output_tensors=(gradF,), derivatives=None, directions=directions, implementation=implementation, base_kwds=base_kwds, **kwds, ) @debug def __init__( self, F, gradF, directions=None, implementation=None, cls=FiniteDifferencesSpaceDerivative, base_kwds=None, **kwds, ): """ Create an operator generator that yields a sequence of operators that compute the gradient of an input field F. Given F, a scalar, vector or tensor field of dimension n, compute the field of dimension n+1 that is the gradient of F: ScalarField: F -> gradF[j] = dF/dxj VectorField: F -> gradF[i,j] = dFi/dxj TensorField: F -> gradF[i0,...,in,j] = dF[i0,...,in]/dxj Derivatives can be computed with respect to specific directions and not necessarily in all directions. To restrict the number of components, take a tensor view on F (and gradF). Example: if F is a VectorField of m components (F0, ..., Fm) in a domain of dimension n, this operator will compute gradF[i,j] = dF[i]/dx[j]. ( dF0/dx0 ... dF0/dxn ) ( . . . ) grad(F) = ( . . . ) ( . . . ) ( dFm/dx0 ... dFm/dxn ) where F is an input field gradF is an output field != F 0 <= i < nb_components 0 <= j < nb_directions nb_components = F.nb_components nb_directions = min( F.dim, len(directions)) Parameters ---------- F: hysop.field.continuous_field.Field Continuous field as input (Scalar, Vector or TensorField). All contained field have to live on the same domain. gradF: hysop.field.continuous_field.Field Continuous field to be written, should have exactly shape F.shape + (ndirections,) directions: tuple of ints, optional The directions in which to take the derivatives. Defaults to range(F.dim). nb_directions = min(F.dim, len(directions)) implementation: Implementation, optional, defaults to None target implementation, should be contained in available_implementations(). If None, implementation will be set to default_implementation(). kwds: dict, optional Extra parameters passed towards base class (MultiSpaceDerivatives). """ base_kwds = first_not_None(base_kwds, {}) check_instance(F, Field) check_instance(gradF, Field) check_instance(directions, tuple, values=int, allow_none=True) directions = to_tuple(first_not_None(directions, range(F.dim))) ndirections = len(directions) if F.is_tensor: nfields = F.size oshape = F.shape + (ndirections,) else: nfields = 1 oshape = (ndirections,) if gradF.is_tensor: if gradF.shape != oshape: msg = "Gradient field shape mismatch, expected {} but got {}." msg = msg.format(oshape, gradF.shape) raise ValueError(msg) else: if oshape != (1,): msg = "Gradient field shape mismatch, expected {} but got a scalar " msg += "field (ie. shape={})." msg = msg.format(oshape, (1,)) raise ValueError(msg) Fs = tuple(f for f in F.fields for d in directions) dFs = gradF.fields directions = tuple(d for _ in range(nfields) for d in directions) derivatives = (1,) * len(directions) base_kwds.update( dict(candidate_input_tensors=(F,), candidate_output_tensors=(gradF,)) ) if not issubclass(cls, (SpaceDerivative, MinMaxDerivativeStatistics)) or ( cls in (SpaceDerivative, MinMaxDerivativeStatistics) ): msg = "cls should be a subclass of SpaceDerivative or MinMaxSpaceDerivativeStatistics, got {}." msg += "\ncls MRO is:\n " msg += "\n ".join(str(t) for t in cls.__mro__) msg = msg.format(cls) raise TypeError(msg) implementation = first_not_None(implementation, cls.default_implementation()) super().__init__( Fs=Fs, dFs=dFs, cls=cls, candidate_input_tensors=(F,), candidate_output_tensors=(gradF,), derivatives=derivatives, directions=directions, implementation=implementation, base_kwds=base_kwds, **kwds, )
[docs] class MinMaxGradientStatistics(Gradient): """ Interface for computing some statistics on the gradient of a field (minimum and maximum) one component at a time to limit memory usage. This will generate multiple MinMaxDerivativeStatistics operators. """ @debug def __new__( mcls, F, gradF=None, directions=None, coeffs=None, Fmin=None, Fmax=None, Finf=None, all_quiet=True, print_tensors=True, name=None, pretty_name=None, pbasename=None, ppbasename=None, variables=None, implementation=None, base_kwds=None, cls=MinMaxFiniteDifferencesDerivativeStatistics, **kwds, ): return super().__new__( mcls, F=F, gradF=gradF, directions=directions, extra_params=None, cls=cls, variables=variables, **kwds, ) @debug def __init__( self, F, gradF=None, directions=None, coeffs=None, Fmin=None, Fmax=None, Finf=None, all_quiet=True, print_tensors=True, name=None, pretty_name=None, pbasename=None, ppbasename=None, variables=None, implementation=None, base_kwds=None, cls=MinMaxFiniteDifferencesDerivativeStatistics, **kwds, ): """ Create an operator generator that yields a sequence of operators that compute statistics on the gradient of an input field F. MinMaxGradientStatistics can compute some commonly used Field statistics: Fmin: component-wise and direction-wise min values of the gradient of the field. Fmax: component-wise and direction-wise max values of the gradient of the field. Finf: component-wise and direction-wise max values of the absolute value of the gradient of the field (computed using Fmin and Fmax). Derivatives can be computed with respect to specific directions and not necessarily in all directions. To restrict the number of components, take a tensor view on F (and gradF). Let k = idx + (j,) gradF[k] = dF[idx]/dXd Fmax[k] = Smin * min( dF[idx]/dXd ) Fmin[k] = Smax * max( dF[idx]/dXd ) Finf[k] = Sinf * max( |Fmin[k]|, |Fmax[k]| ) where F is an input field, nb_components = F.nb_components = np.prod(F.shape) nb_directions = min( F.dim, len(directions)) gradF is an optional output tensor field, idx is contained in numpy.ndindex(*F.shape) 0 <= j < nb_directions d = directions[j] Fmin = created or supplied TensorParameter of shape F.shape + (nb_directions,). Fmax = created or supplied TensorParameter of shape F.shape + (nb_directions,). Finf = created or supplied TensorParameter of shape F.shape + (nb_directions,). Smin = coeffs['Fmin'] or coeffs['Fmin'][k] Smax = coeffs['Fmax'] or coeffs['Fmax'][k] Sinf = coeffs['Finf'] or coeffs['Fmax'][k] All statistics are only computed if explicitely required by user, unless required to compute another required statistic, see Notes. Parameters ---------- F: Field The continuous input field on which the gradient will be taken and statistics will be computed. gradF: Field, optional Optional output field for the gradient. If the gradient is required as an output, one can also use MinMaxStatistics on a precomputed gradient (using the Gradient operator) instead of MinMaxGradientStatistics. directions: array like of ints, optional The directions in which the statistics are computed, defaults to all directions (ie. range(F.dim)). coeffs: dict of scalar or array like of coefficients, optional Optional scaling of the statistics. Scaling factor should be a scalar or an array-like of scalars for each components of gradF. If not given default to 1 for all statistics. F...: TensorParameter or boolean, optional At least one statistic should be specified (either by boolean or TensorParameter). TensorParameters should be of shape F.shape + (nb_directions,), see Notes. If set to True, the TensorParameter will be generated automatically. Autogenerated TensorParameters that are not required by the user but are generated anyway are set to be quiet. All autogenerated TensorParameters can be retrieved as attributes of this object. all_quiet: bool, optional, defaults to True Set all generated params to be quiet, even the ones that are requested explicitely. print_tensors: bool, optional, defaults to True Should the phony operator print the tensor parameters during apply ? name: str, optional Name template for generated operator names. Defaults to MinMax({}) where {} will be replaced by gradF[k].name. pretty_name: str, optional Name template for generaed operatr pretty names. Defaults to |+/-{}| where {} will be replaced by gradF[k].pretty_name. pbasename: str, optional Basename for created tensor parameters. Defaults to gradF.name. ppbasename: str, optional Pretty basename for created tensor parameters. Defaults to gradF.pretty_name. variables: dict Dictionary of fields as keys and topologies as values. implementation: hysop.constants.Implementation, optional Specify generated operator underlying backend implementation. Target implementation, should be contained in MinMaxDerivativeStatistics.available_implementations(). If None, implementation will be set to MinMaxDerivativeStatistics.default_implementation(). base_kwds: dict Base class keyword arguments. kwds: dict Additional kwds passed to chosen implementation. Attributes: ----------- Fmin, Fmax, Finf: TensorParameter All generated tensor parameters. Unused statistics are set to None. Notes ----- nb_components = F.nb_components nb_directions = min(F.dim, len(directions)). About statistics: Finf requires to compute Fmin and Fmax. Finf = Sinf * max( abs(Smin*Fmin), abs(Smax*Fmax)) where Sinf, Smin and Smax are the scaling coefficients defined in coeffs. """ check_instance(F, Field) check_instance(gradF, Field, allow_none=True) check_instance( directions, tuple, values=int, allow_none=True, minval=0, maxval=F.dim - 1, minsize=1, unique=True, ) check_instance( coeffs, dict, keys=str, values=(int, float, npw.number), allow_none=True ) check_instance( variables, dict, keys=Field, values=CartesianTopologyDescriptors, allow_none=True, ) check_instance(name, str, allow_none=True) check_instance(pbasename, str, allow_none=True) check_instance(ppbasename, str, allow_none=True) check_instance(implementation, Implementation, allow_none=True) check_instance(base_kwds, dict, allow_none=True) check_instance(all_quiet, bool, allow_none=True) if ( ((Fmin is None) or (Fmin is False)) and ((Fmax is None) or (Fmax is False)) and ((Finf is None) or (Finf is False)) ): msg = "No statistics were requested." msg += "\nPlease specify Fmin, Fmax and/or Finf by either setting " msg += " their value to True, or by by passing an already existing " msg += " tensor parameter." raise ValueError(msg) coeffs = first_not_None(coeffs, {}) variables = first_not_None(variables, {F: None}) all_quiet = first_not_None(all_quiet, False) directions = to_tuple(first_not_None(directions, range(F.dim))) nb_directions = len(directions) if F.is_tensor: oshape = F.shape + (nb_directions,) else: oshape = (nb_directions,) if gradF is None: gradF = F.gradient(directions=directions, is_tmp=True) assert gradF.shape == oshape, gradF.shape variables.setdefault(gradF, variables[F]) _names = {"Fmin": "{}_min", "Fmax": "{}_max", "Finf": "{}_inf"} _pretty_names = {"Fmin": "{}₋", "Fmax": "{}₊", "Finf": "|{}|₊"} pbasename = first_not_None(pbasename, gradF.name) ppbasename = first_not_None(ppbasename, gradF.pretty_name) names = {k: v.format(pbasename) for (k, v) in _names.items()} pretty_names = {k: v.format(ppbasename) for (k, v) in _pretty_names.items()} def make_param(k, quiet): return TensorParameter( name=names[k], pretty_name=pretty_names[k], dtype=F.dtype, shape=oshape, quiet=quiet, ) parameters = {} _parameters = dict(Fmin=Fmin, Fmax=Fmax, Finf=Finf) for k, v in _parameters.items(): param = _parameters[k] if isinstance(param, TensorParameter): pass elif param is True: param = make_param(k, quiet=all_quiet) elif (_parameters["Finf"] is not None) and ((k == "Fmin") or (k == "Fmax")): param = make_param(k, quiet=True) else: param = None setattr(self, k, param) continue assert param.shape == oshape coeffs.setdefault(k, 1) parameters[k] = param setattr(self, k, param) unused_coeffs = set(coeffs.keys()) - set(parameters.keys()) if unused_coeffs: msg = "The following coefficients are not needed: {}" msg = msg.format(unused_coeffs) raise ValueError(unused_coeffs) name = first_not_None(name, "MinMax({})") pretty_name = first_not_None(pretty_name, "|±{}|") extra_params = { "name": gradF.new_empty_array(), "pretty_name": gradF.new_empty_array(), "coeffs": coeffs, "implementation": implementation, } for idx, Fi in gradF.nd_iter(): for statname, stat in parameters.items(): if stat is None: continue pname = _names[statname].format(Fi.name) ppname = _pretty_names[statname].format(Fi.pretty_name) S = stat.view(idx=idx, name=pname, pretty_name=ppname) stats = extra_params.setdefault(statname, gradF.new_empty_array()) stats[idx] = S extra_params["name"][idx] = name.format(Fi.name) extra_params["pretty_name"][idx] = pretty_name.format(Fi.pretty_name) super().__init__( F=F, gradF=gradF, directions=directions, extra_params=extra_params, cls=cls, variables=variables, **kwds, ) # add a phony operator to gather parameter views class MergeTensorViewsOperator(ComputationalGraphOperator): @classmethod def supports_mpi(cls): return True @op_apply def apply(self, **kwds): super().apply(**kwds) if not print_tensors: return for k, v in _parameters.items(): if (v is not None) and (v is not False): param = parameters[k] msg = f">Parameter {param.pretty_name} set to:\n{param.value}" vprint(msg) _phony_input_params = set() _phony_output_params = set() for pname in _names.keys(): if pname in extra_params: param = parameters[pname] _phony_input_params.update({p for p in extra_params[pname].ravel()}) _phony_output_params.update({param}) op = MergeTensorViewsOperator( name=name.format(gradF.name), pretty_name=pretty_name.format(gradF.pretty_name), input_params=_phony_input_params, output_params=_phony_output_params, ) self._phony_op = op @debug def _generate(self): """Generate all operators.""" operators = super()._generate() operators += (self._phony_op,) return operators
[docs] @debug def generate_direction(self, i, dt_coeff): # See MultiSpaceDerivatives for the directional interface ops = super().generate_direction(i=i, dt_coeff=dt_coeff) if ops and (i == max(self.directions)): ops += (self._phony_op,) return ops